﻿package org.tisn.qq.net;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Vector;

import org.tisn.qq.QLog;
import org.tisn.qq.packets.PacketParseException;

/**
 * 发送接收线程
 * 
 * @author notxx
 */
public final class Porter extends Thread {
	private final static String TAG = "Porter";
	/** 线程是否结束的标志 */
	protected boolean shutdown = false;
	/** 端口选择器 */
	protected Selector selector;
	
	// port列表
	private List<IPort> ports;
	
	// 连接释放请求
	private Queue<Object> disposeQueue;
	
	// 新连接请求
	private List<Object> newConnections;
	
	/**
	 * 构造一个Porter.
	 */
	public Porter() {
	    ports = new ArrayList<IPort>();
	    newConnections = new Vector<Object>();
	    disposeQueue = new LinkedList<Object>();
		setName("Porter");
		setDaemon(true);
	    // 创建新的Selector
		try {
			selector = Selector.open();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 注册一个port到porter中
	 * 
	 * @param port
	 * 		IPort实现
	 * @throws ClosedChannelException
	 * 		如果注册失败
	 */
	public void register(IPort port) throws ClosedChannelException {
	    SelectableChannel channel = port.channel();
	    if(channel instanceof SocketChannel)
		    channel.register(selector, SelectionKey.OP_CONNECT, port.getNIOHandler());
	    else if(channel instanceof DatagramChannel)
		    channel.register(selector, SelectionKey.OP_READ, port.getNIOHandler());
	    if(!ports.contains(port))
	        ports.add(port);
	}
	
	/**
	 * 以指定的操作注册channel
	 * 
	 * @param port
	 * @param ops
	 * @throws ClosedChannelException
	 */
	public void register(IPort port, int ops) throws ClosedChannelException {
	    SelectableChannel channel = port.channel();
	    if(channel instanceof SocketChannel)
		    channel.register(selector, ops, port.getNIOHandler());
	    else if(channel instanceof DatagramChannel)
		    channel.register(selector, ops, port.getNIOHandler());
	    if(!ports.contains(port))
	        ports.add(port);
	}
	
	/**
	 * 删除一个port，这个port的channel将被关闭
	 * 
	 * @param port
	 * 		IPort实现
	 * @throws IOException
	 */
	private void deregister(IPort port) {
		if(port == null)
			return;
		
	    if(!ports.remove(port))
	    	return;
    	SelectionKey key = port.channel().keyFor(selector);
    	if(key != null)
    		key.cancel();
        port.dispose();
	}

	/**
	 * 发送错误事件到所有port
	 * 
	 * @param e
	 * 		包含错误信息的Exception
	 */
	private void dispatchErrorToAll(Exception e) {
		for(IPort port : ports)
			port.getNIOHandler().processError(e);
	}
	
	/**
	 * 通知所有port发送包
	 * @throws IOException
	 */
	private void notifySend() {
	    int size = ports.size();
	    for(int i = 0; i < size; i++) {
	        INIOHandler handler = null;
	        try {
		        handler = (ports.get(i)).getNIOHandler();
                handler.processWrite();
            } catch (IOException e) {
	            QLog.e(TAG, e.getMessage());
	            handler.processError(e);
            } catch (IndexOutOfBoundsException e) {                
            }
	    }
	}
	
	/**
	 * 不断运转维护所有注册的IPort对象.
	 * 通过调用它们的几个函数分别做到清空发送队列/填充接收队列/维护队列的功能.
	 * @see IPort#send(ByteBuffer)
	 * @see IPort#receive(ByteBuffer)
	 * @see IPort#maintain()
	 */
	@Override
	public void run() {
		QLog.d(TAG, "Porter已经启动");		
		int n = 0;
	    while(!shutdown) {		    		    
	        // do select
            try {
                n = selector.select(3000);
                // 如果要shutdown，关闭selector退出
                if (shutdown) {
                    selector.close();
                	break;			        
                }
            } catch (IOException e) {
	            QLog.e(e.getMessage());
	            dispatchErrorToAll(e);
            } 
            
            // 处理连接释放请求
            processDisposeQueue();
            
		    // 如果select返回大于0，处理事件
		    if(n > 0) {
		        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
					// 得到下一个Key
					SelectionKey sk = i.next();
					i.remove();
					// 检查其是否还有效
	                if(!sk.isValid())
	                    continue;

					// 处理
					INIOHandler handler = (INIOHandler)sk.attachment();
		            try {
                        if(sk.isConnectable())
                            handler.processConnect(sk);
                        else if (sk.isReadable())
                            handler.processRead(sk);
                    } catch (IOException e) {
        	            QLog.e(TAG, e.getMessage() , e);
        	            handler.processError(e);
                    } catch (PacketParseException e) {
        	            QLog.d(TAG, "包解析错误: " + e.getMessage(), e);
    	            } catch (RuntimeException e) {
    	                QLog.e(TAG, "运行时错误: " + e.getMessage(), e);
    	           }
		        }
		        
		        n = 0;
		    }
		    
		    checkNewConnection();		    
		    notifySend();		    
		}
	    
        selector = null;
        shutdown = false;
		QLog.d(TAG, "Porter已经退出");
	}
	
	/**
	 * 添加释放请求
	 * 
	 * @param p
	 */
	public void addDisposeRequest(IPort p) {
		synchronized(disposeQueue) {
			disposeQueue.offer(p);
		}
	}
	
    /**
     * 检查是否有新连接要加入
     */
    private void checkNewConnection() {	 
        while(!newConnections.isEmpty()) {
            Object handler = newConnections.remove(0);
            if(handler instanceof IPort) {
		        try {
	                register((IPort)handler);
	            } catch (ClosedChannelException e1) {
	            }
	        }            
        }
    }
    
    /**
     * 处理连接释放请求
     */
    private void processDisposeQueue() {
    	synchronized(disposeQueue) {
    		while(!disposeQueue.isEmpty()) {
    			Object obj = disposeQueue.poll();
    			if(obj instanceof IPort)
    				deregister((IPort)obj);
    		}
    	}
    }

    /**
     * 关闭porter
     */
    public void shutdown() {
	    if(selector != null) {
		    shutdown = true;
	        selector.wakeup();	        
	    }
    }
    
    /**
     * 唤醒selector
     */
    public void wakeup() {
        selector.wakeup();
    }
    
    /**
     * 唤醒selector然后注册这个proxy
     * 
     * @param proxy
     */
    public void wakeup(Object handler) {
        newConnections.add(handler);
        selector.wakeup();
    }
}